package com.stars.distributed.schedule.mq.rocketmq.apache;

import com.alibaba.fastjson.JSON;
import com.stars.distributed.schedule.exception.DistributedScheduleMQException;
import com.stars.distributed.schedule.mq.MQMessageSender;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import java.util.UUID;

/**
 * rocketmq消息发送者
 *
 * @author guoguifang
 */
public class RocketMQMessageSender implements MQMessageSender {

    private String nameServer;

    private String groupName;

    @Override
    public boolean convertAndSend(String topic, Object message) throws DistributedScheduleMQException {
        try {
            Message rocketMessage = new Message(topic, JSON.toJSONString(message).getBytes("utf-8"));
            SendResult result = getRocketMQProducer().send(rocketMessage);
            return result.getSendStatus() == SendStatus.SEND_OK;
        } catch (DistributedScheduleMQException e) {
            throw e;
        } catch (Exception e) {
            throw new DistributedScheduleMQException("Use rocketMQ send message to topic [" + topic + "] failure!", e);
        }
    }

    private MQProducer getRocketMQProducer() throws DistributedScheduleMQException {
        DefaultMQProducer mqProducer = (DefaultMQProducer) TransactionSynchronizationManager.getResource(this);
        if (mqProducer == null) {
            try {
                mqProducer = new DefaultMQProducer(groupName);
                mqProducer.setNamesrvAddr(nameServer);
                mqProducer.setInstanceName(UUID.randomUUID().toString());
                mqProducer.setVipChannelEnabled(false);
                mqProducer.start();
                TransactionSynchronizationManager.bindResource(this, mqProducer);
            } catch (Exception e) {
                throw new DistributedScheduleMQException("Registered rocketMQ send transaction resource failed!", e);
            }
        }
        return mqProducer;
    }

    public RocketMQMessageSender(String nameServer, String groupName) {
        this.nameServer = nameServer;
        this.groupName = groupName;
    }
}
